方式1 全注解

发送者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@RestController
@RequestMapping("/rabbit")
public class MqSend {
@Autowired
private AmqpTemplate amqpTemplate;

@RequestMapping("/send")
public void test() {

//参数1 队列的名字
//参数2。消息体
amqpTemplate.convertAndSend("myQueuebingExchange", "now" + new Date());


System.out.println("监听队列中~~~~~~~~~~");
}
}

消费者

1 需要手动创建队列不推荐

1
2
3
4
5
6
7
8
@Slf4j
@Component
public class MqReceiver {
// 1 @RabbitListener(queues = "myQueue")
@RabbitListener(queues = "myQueue")
public void process(String message) {
log.info("message={}", message);
}

2 自动创建队列

1
2
3
4
5
6
7
8
//接受消息
@Slf4j
@Component
public class MqReceiver {
// 2 自动创建队列@RabbitListener(queuesToDeclare = @Queue("myQueueDecleare"))
public void process(String message) {
log.info("message={}", message);
}

3 自动创建队列 并自动绑定交换机

1
2
3
4
5
6
7
8
9
10
//接受消息
@Slf4j
@Component
public class MqReceiver {
// 3 自动绑定队列和交换机
@RabbitListener(bindings = @QueueBinding(value = @Queue("myQueuebingExchange"), exchange = @Exchange("myExchange")
))
public void process(String message) {
log.info("message={}", message);
}

4 自动创建队列 并自动绑定交换机自动绑定路由

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
//接受消息
@Slf4j
@Component
public class MqReceiver {
//电子数码接受消息
@RabbitListener(bindings = @QueueBinding(
value = @Queue("computerQueue"),
exchange = @Exchange("computerExchange"),
key = "computerkey"
))
public void processComputer(String message) {
log.info("message={}", message);
}







//水果 接受消息
@RabbitListener(bindings = @QueueBinding(
value = @Queue("fruitQueue"),
key = "furitKey",
exchange = @Exchange("Fruitexchange")
))
public void processFruit(String message) {
log.info("message={}", message);
}

发送方

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Test
public void sendMessage() {
amqpTemplate.convertAndSend("myQueuebingExchange", "now" + new Date());
System.out.println("监听队列中");
}

//电子产品
//参数1 队列
// 参数2 路由
// 参数3 信息
@Test
public void sendorder() {
amqpTemplate.convertAndSend("computerQueue", "computerkey", "now" + new Date() + "dianzi");
System.out.println("电子产品监听队列中~~~");
}

//水果发送方
@Test
public void sendfruit() {

//参数1 队列
// 参数2 路由
// 参数3 信息
amqpTemplate.convertAndSend("fruitQueue", "furitKey", "now" + new Date() + "fruit");
System.out.println("水果监听队列中~~~");

}

⚠️。上述发送队列将自动绑定到交换机上 不需要指定交换机的名字 只需要指定队列和路由key


===


方式2。配置文件的形式

* Direct模式 交换机Exchange.

image-20190810163805710

配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
 
@Configuration
public class MQConfig {

public static final String QUEUE = "queue";


/**
* Direct模式 交换机Exchange(指定模式,RabbitMQ的默认模式)
* */
@Bean
public Queue queue() {
return new Queue(QUEUE, true);
}
#### sender
1
2
3
4
5
6
7
8
9
10
11
12
@Service
public class MQSender {

private static Logger log = LoggerFactory.getLogger(MQSender.class);

@Autowired
AmqpTemplate amqpTemplate ;

public void send(Object message) {
amqpTemplate.convertAndSend(MQConfig.QUEUE, msg);
log.info("send message:"+msg);
}

receive

1
2
3
4
5
6
@Service
public class MQReceiver {
@RabbitListener(queues=MQConfig.QUEUE)
public void receive(String message) {
log.info("receive message:"+message);
}

Topic模式 交换机Exchange

image-20190810164536249

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Configuration
public class MQConfig {
//对列
public static final String TOPIC_QUEUE1 = "topic.queue1";
public static final String TOPIC_QUEUE2 = "topic.queue2";
//交换机
public static final String TOPIC_EXCHANGE = "topicExchage";


/**
* Topic模式 交换机Exchange
* */
@Bean
public Queue topicQueue1() {
return new Queue(TOPIC_QUEUE1, true);
}
@Bean
public Queue topicQueue2() {
return new Queue(TOPIC_QUEUE2, true);
}
@Bean
public TopicExchange topicExchage(){
return new TopicExchange(TOPIC_EXCHANGE);
}
@Bean
public Binding topicBinding1() {
//参数3 路由key
return BindingBuilder.bind(topicQueue1()).to(topicExchage()).with("topic.key1");
}
@Bean
public Binding topicBinding2() {
return BindingBuilder.bind(topicQueue2()).to(topicExchage()).with("topic.#");
}

sender

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Service
public class MQSender {

private static Logger log = LoggerFactory.getLogger(MQSender.class);

@Autowired
AmqpTemplate amqpTemplate ;

public void sendTopic(Object message) {
String msg = RedisService.beanToString(message);
log.info("send topic message:"+msg);
//参数1 队列
//参数2 路由??
amqpTemplate.convertAndSend(MQConfig.TOPIC_EXCHANGE, "topic.key1", msg+"1");
amqpTemplate.convertAndSend(MQConfig.TOPIC_EXCHANGE, "topic.key2", msg+"2");
}

Receive

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Service
public class MQReceiver {

@RabbitListener(queues=MQConfig.TOPIC_QUEUE1)

public void receiveTopic1(String message) {
log.info(" topic queue1 message:"+message);
}


@RabbitListener(queues=MQConfig.TOPIC_QUEUE2)

public void receiveTopic2(String message) {
log.info(" topic queue2 message:"+message);
}

Fanout模式

image-20190810165303173

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

@Configuration
public class MQConfig {

public static final String TOPIC_QUEUE1 = "topic.queue1";
public static final String TOPIC_QUEUE2 = "topic.queue2";
public static final String FANOUT_EXCHANGE = "fanoutxchage";


@Bean
public Queue topicQueue1() {
return new Queue(TOPIC_QUEUE1, true);
}
@Bean
public Queue topicQueue2() {
return new Queue(TOPIC_QUEUE2, true);
}

/**
* Fanout模式 交换机Exchange
* */
@Bean
public FanoutExchange fanoutExchage(){
return new FanoutExchange(FANOUT_EXCHANGE);
}
@Bean
public Binding FanoutBinding1() {
return BindingBuilder.bind(topicQueue1()).to(fanoutExchage());
}
@Bean
public Binding FanoutBinding2() {
return BindingBuilder.bind(topicQueue2()).to(fanoutExchage());
}

sender

1
2
3
4
5
6
7
public void sendFanout(Object message) {
String msg = RedisService.beanToString(message);
log.info("send fanout message:"+msg);
//1 交换机
//2 路由
amqpTemplate.convertAndSend(MQConfig.FANOUT_EXCHANGE, "", msg);
}

Header模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* Header模式 交换机Exchange
* */
@Bean
public HeadersExchange headersExchage(){
return new HeadersExchange(HEADERS_EXCHANGE);
}
@Bean
public Queue headerQueue1() {
return new Queue(HEADER_QUEUE, true);
}
@Bean
public Binding headerBinding() {
Map<String, Object> map = new HashMap<String, Object>();
map.put("header1", "value1");
map.put("header2", "value2");
return BindingBuilder.bind(headerQueue1()).to(headersExchage()).whereAll(map).match();
}

sender

1
2
3
4
5
6
7
8
9
public void sendHeader(Object message) {
String msg = RedisService.beanToString(message);
log.info("send fanout message:"+msg);
MessageProperties properties = new MessageProperties();
properties.setHeader("header1", "value1");
properties.setHeader("header2", "value2");
Message obj = new Message(msg.getBytes(), properties);
amqpTemplate.convertAndSend(MQConfig.HEADERS_EXCHANGE, "", obj);
}

Receive

1
2
3
4
@RabbitListener(queues=MQConfig.HEADER_QUEUE)
public void receiveHeaderQueue(byte[] message) {
log.info(" header queue message:"+new String(message));
}